/*
 * TDMetric.actor.h
 *
 * This source file is part of the FoundationDB open source project
 *
 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

// When actually compiled (NO_INTELLISENSE), include the generated version of this file.  In intellisense use the source version.
#if defined(NO_INTELLISENSE) && !defined(FLOW_TDMETRIC_ACTOR_G_H)
        #define FLOW_TDMETRIC_ACTOR_G_H
        #include "flow/TDMetric.actor.g.h"
#elif !defined(FLOW_TDMETRIC_ACTOR_H)
        #define FLOW_TDMETRIC_ACTOR_H

#include "flow/flow.h"
#include "flow/IndexedSet.h"
#include "flow/network.h"
#include "flow/Knobs.h"
#include "flow/genericactors.actor.h"
#include "flow/CompressedInt.h"
#include <algorithm>
#include <functional>
#include "flow/actorcompiler.h"  // This must be the last #include.

struct MetricNameRef {
	MetricNameRef() {}
	MetricNameRef(const StringRef& type, const StringRef& name, const StringRef &id)
	  : type(type), name(name), id(id) {
	}
	MetricNameRef(Arena& a, const MetricNameRef& copyFrom)
	  : type(a, copyFrom.type), name(a, copyFrom.name), id(a, copyFrom.id) {
	}

	StringRef type, name, id;

	std::string toString() const {
		return format("(%s,%s,%s,%s)", type.toString().c_str(), name.toString().c_str(), id.toString().c_str());
	}

	int expectedSize() const {
		return type.expectedSize() + name.expectedSize();
	}
};

extern std::string reduceFilename(std::string const &filename);
inline bool operator < (const MetricNameRef& l, const MetricNameRef& r ) {
	int cmp = l.type.compare(r.type);
	if(cmp == 0) {
		cmp = l.name.compare(r.name);
		if(cmp == 0)
			cmp = l.id.compare(r.id);
	}
	return cmp < 0;
}

inline bool operator == (const MetricNameRef& l, const MetricNameRef& r ) {
	return l.type == r.type && l.name == r.name && l.id == r.id;
}

inline bool operator != (const MetricNameRef& l, const MetricNameRef& r ) {
	return !(l == r);
}

struct KeyWithWriter {
	Standalone<StringRef> key;
	BinaryWriter writer;
	int writerOffset;

	KeyWithWriter( Standalone<StringRef> const& key, BinaryWriter& writer, int writerOffset = 0) : key(key), writer(std::move(writer)), writerOffset(writerOffset) {}
	KeyWithWriter( KeyWithWriter&& r ) : key(std::move(r.key)), writer(std::move(r.writer)), writerOffset(r.writerOffset) {}
	void operator=( KeyWithWriter&& r ) { key = std::move(r.key); writer = std::move(r.writer); writerOffset = r.writerOffset; }

	StringRef value() {
		return StringRef(writer.toStringRef().substr(writerOffset));
	}
};

// This is a very minimal interface for getting metric data from the DB which is needed
// to support continuing existing metric data series.
// It's lack of generality is intentional.
class IMetricDB {
public:
	virtual ~IMetricDB() {}

	// key should be the result of calling metricKey or metricFieldKey with time = 0
	virtual Future<Optional<Standalone<StringRef>>> getLastBlock(Standalone<StringRef> key) = 0;
};

// Key generator for metric keys for various things.
struct MetricKeyRef {
	MetricKeyRef() : level(-1) {}
	MetricKeyRef(Arena& a, const MetricKeyRef& copyFrom)
	  : prefix(a, copyFrom.prefix), name(a, copyFrom.name), address(a, copyFrom.address),
	    fieldName(a, copyFrom.fieldName), fieldType(a, copyFrom.fieldType), level(copyFrom.level) {
	}

	StringRef prefix;
	MetricNameRef name;
	StringRef address;
	StringRef fieldName;
	StringRef fieldType;
	uint64_t level;

	int expectedSize() const {
		return prefix.expectedSize() + name.expectedSize() + address.expectedSize() + fieldName.expectedSize() + fieldType.expectedSize();
	}

	template <typename T> inline MetricKeyRef withField(const T &field) const {
		MetricKeyRef mk(*this);
		mk.fieldName = field.name();
		mk.fieldType = field.typeName();
		return mk;
	}

	const Standalone<StringRef> packLatestKey() const;
	const Standalone<StringRef> packDataKey(int64_t time = -1) const;
	const Standalone<StringRef> packFieldRegKey() const;

	bool isField() const { return fieldName.size() > 0 && fieldType.size() > 0; }
	void writeField(BinaryWriter &wr) const;
	void writeMetricName(BinaryWriter &wr) const;
};

struct MetricUpdateBatch {
	std::vector<KeyWithWriter> inserts;
	std::vector<KeyWithWriter> appends;
	std::vector<std::pair<Standalone<StringRef>,Standalone<StringRef>>> updates;
	std::vector<std::function<Future<Void>(IMetricDB *, MetricUpdateBatch *)>> callbacks;

	void clear() {
		inserts.clear();
		appends.clear();
		updates.clear();
		callbacks.clear();
	}
};

template<typename T>
inline const StringRef metricTypeName() {
	// If this function does not compile then T is not a supported metric type
	return T::metric_field_type();
}
#define MAKE_TYPENAME(T, S) template<> inline const StringRef metricTypeName<T>() { return LiteralStringRef(S); }
MAKE_TYPENAME(bool, "Bool")
MAKE_TYPENAME(int64_t, "Int64")
MAKE_TYPENAME(double, "Double")
MAKE_TYPENAME(Standalone<StringRef>, "String")
#undef MAKE_TYPENAME

struct BaseMetric;

// The collection of metrics that exist for a single process, at a single address.
class TDMetricCollection {
public:
	TDMetricCollection() : currentTimeBytes(0) {}

	// Metric Name to reference to its instance
	Map<Standalone<MetricNameRef>, Reference<BaseMetric>, MapPair<Standalone<MetricNameRef>, Reference<BaseMetric>>, int> metricMap;

	AsyncTrigger metricAdded;
	AsyncTrigger metricEnabled;
	AsyncTrigger metricRegistrationChanged;

	// Initialize the collection.  Once this returns true, metric data can be written to a database.  Note that metric data can be logged
	// before that time, just not written to a database.
	bool init() {
		// Get and store the local address in the metric collection, but only if it is not 0.0.0.0:0
		if( address.size() == 0 ) {
			NetworkAddress addr = g_network->getLocalAddress();
			if(addr.ip != 0 && addr.port != 0)
				address = StringRef(addr.toString());
		}
		return address.size() != 0;
	}

	// Returns the TDMetrics that the calling process should use
	static TDMetricCollection* getTDMetrics() {
		if(g_network == nullptr)
			return nullptr;
		return static_cast<TDMetricCollection*>((void*) g_network->global(INetwork::enTDMetrics));
	}

	Deque<uint64_t> rollTimes;
	int64_t currentTimeBytes;
	Standalone<StringRef> address;

	void checkRoll(uint64_t t, int64_t usedBytes);
	bool canLog(int level);
};

struct MetricData {
	uint64_t start;
	uint64_t rollTime;
	uint64_t appendStart;
	BinaryWriter writer;

	explicit MetricData(uint64_t appendStart = 0) :
		writer(AssumeVersion(currentProtocolVersion)),
		start(0),
		rollTime(std::numeric_limits<uint64_t>::max()),
		appendStart(appendStart) {
	}

	MetricData( MetricData&& r ) noexcept(true) :
		start(r.start),
		rollTime(r.rollTime),
		appendStart(r.appendStart),
		writer(std::move(r.writer)) {
	}

	void operator=( MetricData&& r ) noexcept(true) {
		start = r.start; rollTime = r.rollTime; appendStart = r.appendStart; writer = std::move(r.writer);
	}

	std::string toString();
};

// Some common methods to reduce code redundancy across different metric definitions
template<typename T, typename _ValueType = Void>
struct MetricUtil {
	typedef _ValueType ValueType;
	typedef T MetricType;

	// Looks up a metric by name and id and returns a reference to it if it exists.
	// Empty names will not be looked up.
	// If create is true then a metric will be created with the given initial value if one could not be found to return.
	// If a metric is created and name is not empty then the metric will be placed in the collection.
	static Reference<T> getOrCreateInstance(StringRef const& name, StringRef const &id = StringRef(), bool create = false, ValueType initial = ValueType()) {
		Reference<T> m;
		TDMetricCollection *collection = TDMetricCollection::getTDMetrics();

		// If there is a metric collect and this metric has a name then look it up in the collection
		bool useMap = collection != nullptr && name.size() > 0;
		MetricNameRef mname;

		if(useMap) {
			mname = MetricNameRef(T::metricType, name, id);
			auto mi = collection->metricMap.find(mname);
			if(mi != collection->metricMap.end()) {
				m = mi->value.castTo<T>();
			}
		}

		// If we don't have a valid metric reference yet and the create flag was set then create one and possibly put it in the map
		if(!m && create) {
			// Metric not found in collection but create is set then create it in the map
			m = Reference<T>(new T(mname, initial));
			if(useMap) {
				collection->metricMap[mname] = m.template castTo<BaseMetric>();
				collection->metricAdded.trigger();
			}
		}

		return m;
	}

	// Lookup the T metric by name and return its value (or nullptr if it doesn't exist)
	static T * lookupMetric(MetricNameRef const &name) {
		auto it = T::metricMap().find(name);
		if(it != T::metricMap().end())
			return it->value;
		return nullptr;
	}
};

// index_sequence implementation since VS2013 doesn't have it yet
template <size_t... Ints> class index_sequence {
public:
	static size_t size() { return sizeof...(Ints); }
};

template <size_t Start, typename Indices, size_t End>
struct make_index_sequence_impl;

template <size_t Start, size_t... Indices, size_t End>
struct make_index_sequence_impl<Start, index_sequence<Indices...>, End> {
	typedef typename make_index_sequence_impl<
		Start + 1, index_sequence<Indices..., Start>, End>::type type;
};

template <size_t End, size_t... Indices>
struct make_index_sequence_impl<End, index_sequence<Indices...>, End> {
	typedef index_sequence<Indices...> type;
};

// The code that actually implements tuple_map
template <size_t I, typename F, typename... Tuples>
auto tuple_zip_invoke(F f, const Tuples &... ts) -> decltype( f(std::get<I>(ts)...) ) {
	return f(std::get<I>(ts)...);
}

template <typename F, size_t... Is, typename... Tuples>
auto tuple_map_impl(F f, index_sequence<Is...>, const Tuples &... ts) -> decltype( std::make_tuple(tuple_zip_invoke<Is>(f, ts...)...) ) {
	return std::make_tuple(tuple_zip_invoke<Is>(f, ts...)...);
}

// tuple_map( f(a,b), (a1,a2,a3), (b1,b2,b3) ) = (f(a1,b1), f(a2,b2), f(a3,b3))
template <typename F, typename Tuple, typename... Tuples>
auto tuple_map(F f, const Tuple &t, const Tuples &... ts) -> decltype( tuple_map_impl(f, typename make_index_sequence_impl<0, index_sequence<>, std::tuple_size<Tuple>::value>::type(), t, ts...) ) {
	return tuple_map_impl(f, typename make_index_sequence_impl<0, index_sequence<>, std::tuple_size<Tuple>::value>::type(), t, ts...);
}

template <class T>
struct Descriptor {};

// FieldHeader is a serializable (FIXED SIZE!) and updatable Header type for Metric field levels.
// Update is via += with either a T or another FieldHeader
// Default implementation is sufficient for ints and doubles
template<typename T>
struct FieldHeader {
	FieldHeader() : version(1), count(0), sum(0) {}
	uint8_t version;
	int64_t count;
	// sum is a T if T is arithmetic, otherwise it's an int64_t
	typename std::conditional<std::is_floating_point<T>::value, double, int64_t>::type sum;

	void update(FieldHeader const &h) {
		count += h.count;
		sum += h.sum;
	}
	void update(T const &v) {
		++count;
		sum += v;
	}
	template<class Ar> void serialize(Ar &ar) {
		ar & version;
		ASSERT(version == 1);
		ar & count & sum;
	}
};

template <> inline void FieldHeader<Standalone<StringRef>>::update(Standalone<StringRef> const &v) {
	++count;
	sum += v.size();
}

// FieldValueBlockEncoding is a class for reading and writing encoded field values to and from field
// value data blocks.  Note that an implementation can be stateful.
// Proper usage requires that a single Encoding instance is used to either write all field values to a metric
// data block or to read all field values from a metric value block.  This usage pattern enables enables
// encoding and decoding values as deltas from previous values.
//
// The default implementation works for ints and writes delta from the previous value.
template <typename T>
struct FieldValueBlockEncoding {
	FieldValueBlockEncoding() : prev(0) {}
	inline void write(BinaryWriter &w, T v) {
		w << CompressedInt<T>(v - prev);
		prev = v;
	}
	T read(BinaryReader &r) {
		CompressedInt<T> v;
		r >> v;
		prev += v.value;
		return prev;
	}
	T prev;
};

template <>
struct FieldValueBlockEncoding<double> {
	inline void write(BinaryWriter &w, double v) {
		w << v;
	}
	double read(BinaryReader &r) {
		double v;
		r >> v;
		return v;
	}
};

template <>
struct FieldValueBlockEncoding<bool> {
	inline void write(BinaryWriter &w, bool v) {
		w.serializeBytes( v ? LiteralStringRef("\x01") : LiteralStringRef("\x00") );
	}
	bool read(BinaryReader &r) {
		uint8_t *v = (uint8_t *)r.readBytes(sizeof(uint8_t));
		return *v != 0;
	}
};

// Encoder for strings, writes deltas
template <>
struct FieldValueBlockEncoding<Standalone<StringRef>> {
	inline void write(BinaryWriter &w, Standalone<StringRef> const &v) {
		int reuse = 0;
		int stop = std::min(v.size(), prev.size());
		while(reuse < stop && v[reuse] == prev[reuse])
			++reuse;
		w << CompressedInt<int>(reuse) << CompressedInt<int>(v.size() - reuse);
		if(v.size() > reuse)
			w.serializeBytes(v.substr(reuse));
		prev = v;
	}
	Standalone<StringRef> read(BinaryReader &r) {
		CompressedInt<int> reuse;
		CompressedInt<int> extra;
		r >> reuse >> extra;
		ASSERT(reuse.value >= 0 && extra.value >= 0 && reuse.value <= prev.size());
		Standalone<StringRef> v = makeString(reuse.value + extra.value);
		memcpy(mutateString(v),               prev.begin(),             reuse.value);
		memcpy(mutateString(v) + reuse.value, r.readBytes(extra.value), extra.value);
		prev = v;
		return v;
	}
	// Using a Standalone<StringRef> for prev is efficient for writing but not great for reading.
	Standalone<StringRef> prev;
};


// Field level for value type of T using header type of Header.  Default header type is the default FieldHeader implementation for type T.
template <class T, class Header = FieldHeader<T>, class Encoder = FieldValueBlockEncoding<T>>
struct FieldLevel {

	Deque<MetricData> metrics;
	int64_t appendUsed;
	Header header;

	// The previous header and the last timestamp at which an out going MetricData block requires header patching
	Optional<Header> previousHeader;
	uint64_t lastTimeRequiringHeaderPatch;

	Encoder enc;

	explicit FieldLevel() : appendUsed(0) {
		metrics.emplace_back(MetricData());
		metrics.back().writer << header;
	}

	FieldLevel(FieldLevel &&f)
	  : metrics(std::move(f.metrics)), appendUsed(f.appendUsed), enc(f.enc), header(f.header),
	    previousHeader(f.previousHeader), lastTimeRequiringHeaderPatch(f.lastTimeRequiringHeaderPatch) {
	}

	// update Header, use Encoder to write T v
	void log( T v, uint64_t t, bool& overflow, int64_t& bytes ) {
		int lastLength = metrics.back().writer.getLength();
		if( metrics.back().start == 0 )
			metrics.back().start = t;

		header.update(v);
		enc.write(metrics.back().writer, v);

		bytes += metrics.back().writer.getLength() - lastLength;
		if(lastLength + appendUsed > FLOW_KNOBS->MAX_METRIC_SIZE)
			overflow = true;
	}

	void nextKey( uint64_t t ) {
		// If nothing has actually been written to the current block, don't add a new block,
		// just modify this one if needed so that the next log call will set the ts for this block.
		auto &m = metrics.back();
		if(m.start == 0 && m.appendStart == 0)
			return;

		// This block would have appended but had no data so just reset it to a non-append block instead of adding a new one
		if(m.appendStart != 0 && m.writer.getLength() == 0) {
			m.appendStart = 0;
			m.writer << header;
			enc = Encoder();
			return;
		}

		metrics.back().rollTime = t;
		metrics.emplace_back(MetricData());
		metrics.back().writer << header;
		enc = Encoder();
		appendUsed = 0;
	}

	void rollMetric( uint64_t t ) {
		ASSERT(metrics.size());

		if(metrics.back().start) {
			metrics.back().rollTime = t;
			appendUsed += metrics.back().writer.getLength();
			if(metrics.back().appendStart)
				metrics.emplace_back(MetricData(metrics.back().appendStart));
			else
				metrics.emplace_back(MetricData(metrics.back().start));
		}
	}

	// Calculate header as of the end of a value block
	static Header calculateHeader(StringRef block) {
		BinaryReader r(block, AssumeVersion(currentProtocolVersion));
		Header h;
		r >> h;
		Encoder dec;
		while(!r.empty()) {
			T v = dec.read(r);
			h.update(v);
		}
		return h;
	}

	// Read header at position, update it with previousHeader, overwrite old header with new header.
	static void updateSerializedHeader(StringRef buf, const Header &patch) {
		BinaryReader r(buf, AssumeVersion(currentProtocolVersion));
		Header h;
		r >> h;
		h.update(patch);
		OverWriter w(mutateString(buf), buf.size(), AssumeVersion(currentProtocolVersion));
		w << h;
	}

	// Flushes data blocks in metrics to batch, optionally patching headers if a header is given
	void flushUpdates(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
		while(metrics.size()) {
			auto& data = metrics.front();

			if(data.start != 0 && data.rollTime <= rollTime) {
				// If this data is to be appended, write it to the batch now.
				if( data.appendStart ) {
					batch.appends.push_back(KeyWithWriter(mk.packDataKey(data.appendStart), data.writer));
				} else {
					// Otherwise, insert but first, patch the header if this block is old enough
					if(data.rollTime <= lastTimeRequiringHeaderPatch) {
						ASSERT(previousHeader.present());
						FieldLevel<T>::updateSerializedHeader(data.writer.toStringRef(), previousHeader.get());
					}

					batch.inserts.push_back(KeyWithWriter(mk.packDataKey(data.start), data.writer));
				}

				if(metrics.size() == 1) {
					rollMetric(data.rollTime);
					metrics.pop_front();
					break;
				}

				metrics.pop_front();
			}
			else
				break;
		}
	}

	ACTOR static Future<Void> updatePreviousHeader(FieldLevel *self, IMetricDB *db, Standalone<MetricKeyRef> mk, uint64_t rollTime, MetricUpdateBatch *batch) {

		Optional<Standalone<StringRef>> block = wait(db->getLastBlock(mk.packDataKey(-1)));

		// If the block is present, use it
		if(block.present()) {
			// Calculate the previous data's final header value
			Header oldHeader = calculateHeader(block.get());

			// Set the previous header in self to this header for us in patching outgoing blocks
			self->previousHeader = oldHeader;

			// Update the header in self so the next new block created will be current
			self->header.update(oldHeader);

			// Any blocks already in the metrics queue will need to be patched at the time that they are
			// flushed to the DB (which isn't necessarity part of the current flush) so set the last time
			// that requires a patch to the time of the last MetricData in the queue
			self->lastTimeRequiringHeaderPatch = self->metrics.back().rollTime;
		}
		else {
			// Otherwise, there is no previous header so no headers need to be updated at all ever.
			// Set the previous header to an empty header so that flush() sees that this process
			// has already finished, and set lastTimeRequiringHeaderPatch to 0 since no blocks ever need to be patched.
			self->previousHeader = Header();
			self->lastTimeRequiringHeaderPatch = 0;
		}

		// Now flush the level data up to the rollTime argument and patch anything older than lastTimeRequiringHeaderPatch
		self->flushUpdates(mk, rollTime, *batch);

		return Void();
	}

	// Flush this level's data to the output batch.
	// This function must NOT be called again until any callbacks added to batch have been completed.
	void flush(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
		// Don't do anything if there is no data in the queue to flush.
		if(metrics.empty() || metrics.front().start == 0)
			return;

		// If the previous header is present then just call flushUpdates now.
		if(previousHeader.present())
			return flushUpdates(mk, rollTime, batch);

		Standalone<MetricKeyRef> mkCopy = mk;

		// Previous header is not present so queue a callback which will update it
		batch.callbacks.push_back([=](IMetricDB *db, MetricUpdateBatch *batch) mutable -> Future<Void> {
			return updatePreviousHeader(this, db, mkCopy, rollTime, batch);
		});

	}
};

// A field Description to be used for continuous metrics, whose field name and type should never be accessed
struct NullDescriptor {
	static StringRef name() { return StringRef(); }
};

// Descriptor must have the methods name() and typeName().  They can be either static or member functions (such as for runtime configurability).
// Descriptor is inherited so that syntatically Descriptor::fn() works in either case and so that an empty Descriptor with static methods
// will take up 0 space.  EventField() accepts an optional Descriptor instance.
template <class T, class Descriptor = NullDescriptor, class FieldLevelType = FieldLevel<T>>
struct EventField : public Descriptor {
	std::vector<FieldLevelType> levels;

	EventField( EventField&& r ) noexcept(true) : Descriptor(r), levels(std::move(r.levels)) {}

	void operator=( EventField&& r ) noexcept(true) {
		levels = std::move(r.levels);
	}

	EventField(Descriptor d = Descriptor()) : Descriptor(d) {
	}

	static StringRef typeName() { return metricTypeName<T>(); }

	void init() {
		if(levels.size() != FLOW_KNOBS->MAX_METRIC_LEVEL) {
			levels.clear();
			levels.resize(FLOW_KNOBS->MAX_METRIC_LEVEL);
		}
	}

	void log( T v, uint64_t t, int64_t l, bool& overflow, int64_t& bytes ) {
		return levels[l].log(v, t, overflow, bytes);
	}

	void nextKey( uint64_t t, int level ) {
		levels[level].nextKey(t);
	}

	void nextKeyAllLevels( uint64_t t ) {
		for(int64_t i = 0; i < FLOW_KNOBS->MAX_METRIC_LEVEL; i++)
			nextKey(t, i);
	}

	void rollMetric( uint64_t t ) {
		for(int i = 0; i < levels.size(); i++) {
			levels[i].rollMetric(t);
		}
	}

	void flushField(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
		MetricKeyRef fk = mk.withField(*this);
		for(int j = 0; j < levels.size(); ++j) {
			fk.level = j;
			levels[j].flush(fk, rollTime, batch);
		}
	}

	// Writes and Event metric field registration key
	void registerField( const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
		fieldKeys.push_back(mk.withField(*this).packFieldRegKey());
	}
};

struct MakeEventField {
	template <class Descriptor>
	EventField<typename Descriptor::type, Descriptor> operator() (Descriptor) { return EventField<typename Descriptor::type, Descriptor>(); }
};

struct TimeDescriptor {
	static StringRef name() { return LiteralStringRef("Time"); }
};

struct BaseMetric {
	BaseMetric(MetricNameRef const &name) : metricName(name), pCollection(nullptr), registered(false), enabled(false) {
		setConfig(false);
	}
	virtual ~BaseMetric() {
	}

	virtual void addref() = 0;
	virtual void delref() = 0;

	virtual void rollMetric(uint64_t t) = 0;

	virtual void flushData(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) = 0;
	virtual void registerFields(const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys) {};

	// Set the metric's config.  An assert will fail if the metric is enabled before the metrics collection is available.
	void setConfig(bool enable, int minLogLevel = 0) {
		bool wasEnabled = enabled;
		enabled = enable;
		minLevel = minLogLevel;

		if(enable && pCollection == nullptr) {
			pCollection = TDMetricCollection::getTDMetrics();
			ASSERT(pCollection != nullptr);
		}

		if(wasEnabled != enable) {
			if(enabled) {
				onEnable();
				pCollection->metricEnabled.trigger();
			}
			else
				onDisable();
		}
	}

	// Callbacks for when metric is Enabled or Disabled.
	// Metrics should verify their underlying storage on Enable because they could have been initially created
	// at a time when the knobs were not initialized.
	virtual void onEnable() = 0;
	virtual void onDisable() {};

	// Combines checking this metric's configured minimum level and any collection-wide throttling
	// This should only be called after it is determined that a metric is enabled.
	bool canLog(int level) {
		return level >= minLevel && pCollection->canLog(level);
	}

	Standalone<MetricNameRef> metricName;

	bool enabled;  // The metric is currently logging data
	int minLevel;  // The minimum level that will be logged.

	// All metrics need a pointer to their collection for performance reasons - every time a data point is logged
	// canLog must be called which uses the collection's canLog to decide based on the metric write queue.
	TDMetricCollection *pCollection;

	// The metric has been registered in its current form (some metrics can change and require re-reg)
	bool registered;
};

struct BaseEventMetric : BaseMetric {

	BaseEventMetric(MetricNameRef const &name) : BaseMetric(name) {
	}

	// Needed for MetricUtil
	static const StringRef metricType;
	Void getValue() const {
		return Void();
	}
	virtual ~BaseEventMetric() {}

	// Every metric should have a set method for its underlying type in order for MetricUtil::getOrCreateInstance
	// to initialize it.  In the case of event metrics there is no underlying type so the underlying type
	// is Void and set does nothing.
	void set(Void const &val) {}

	virtual StringRef getTypeName() = 0;
};

template <class E>
struct EventMetric : E, ReferenceCounted<EventMetric<E>>, MetricUtil<EventMetric<E>>, BaseEventMetric {
	EventField<int64_t, TimeDescriptor> time;
	bool latestRecorded;
	decltype( tuple_map( MakeEventField(), typename Descriptor<E>::fields() ) ) values;

	virtual void addref() { ReferenceCounted<EventMetric<E>>::addref(); }
	virtual void delref() { ReferenceCounted<EventMetric<E>>::delref(); }

	EventMetric( MetricNameRef const &name, Void) : BaseEventMetric(name), latestRecorded(false) {
	}

	virtual ~EventMetric() {
	}

	virtual StringRef getTypeName() { return Descriptor<E>::typeName(); }

	void onEnable() {
		// Must initialize fields, previously knobs may not have been set.
		time.init();
		initFields( typename Descriptor<E>::field_indexes());
	}

	// Log the event.
	// Returns the time that was logged for the event so that it can be passed to other events that need to be time-sync'd.
	// NOTE:  Do NOT use the same time for two consecutive loggings of the SAME event.  This *could* cause there to be metric data
	// blocks such that the last timestamp of one block is equal to the first timestamp of the next, which means if a search is done
	// for the exact timestamp then the first event will not be found.
	uint64_t log(uint64_t explicitTime = 0) {
		if(!enabled)
			return 0;

		uint64_t t = explicitTime ? explicitTime : timer_int();
		double x = g_random->random01();

		int64_t l = 0;
		if (x == 0.0)
			l = FLOW_KNOBS->MAX_METRIC_LEVEL-1;
		else
			l = std::min(FLOW_KNOBS->MAX_METRIC_LEVEL-1, (int64_t)(::log(1.0/x) / FLOW_KNOBS->METRIC_LEVEL_DIVISOR));

		if(!canLog(l))
			return 0;

		bool overflow = false;
		int64_t bytes = 0;
		time.log(t, t, l, overflow, bytes);
		logFields( typename Descriptor<E>::field_indexes(), t, l, overflow, bytes );
		if(overflow) {
			time.nextKey(t, l);
			nextKeys(typename Descriptor<E>::field_indexes(), t, l);
		}
		latestRecorded = false;
		return t;
	}

	template <size_t... Is>
	void logFields(index_sequence<Is...>, uint64_t t, int64_t l, bool& overflow, int64_t& bytes) {
		auto _ = {
			(std::get<Is>(values).log( std::tuple_element<Is, typename Descriptor<E>::fields>::type::get( static_cast<E&>(*this) ), t, l, overflow, bytes ), Void())...
		};
	}

	template <size_t... Is>
	void initFields(index_sequence<Is...>) {
		auto _ = {
			(std::get<Is>(values).init(), Void())...
		};
	}

	template <size_t... Is>
	void nextKeys(index_sequence<Is...>, uint64_t t, int64_t l ) {
		auto _ = {
			(std::get<Is>(values).nextKey(t, l),Void())...
		};
	}

	virtual void flushData(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
		time.flushField( mk, rollTime, batch );
		flushFields( typename Descriptor<E>::field_indexes(), mk, rollTime, batch );
		if(!latestRecorded) {
			batch.updates.push_back(std::make_pair(mk.packLatestKey(), StringRef()));
			latestRecorded = true;
		}
	}

	template <size_t... Is>
	void flushFields(index_sequence<Is...>, MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch ) {
		auto _ = {
			(std::get<Is>(values).flushField( mk, rollTime, batch ),Void())...
		};
	}

	virtual void rollMetric( uint64_t t ) {
		time.rollMetric(t);
		rollFields( typename Descriptor<E>::field_indexes(), t );
	}

	template <size_t... Is>
	void rollFields(index_sequence<Is...>, uint64_t t ) {
		auto _ = {
			(std::get<Is>(values).rollMetric( t ),Void())...
		};
	}

	virtual void registerFields( MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
		time.registerField( mk, fieldKeys );
		registerFields( typename Descriptor<E>::field_indexes(), mk, fieldKeys );
	}

	template <size_t... Is>
	void registerFields(index_sequence<Is...>, const MetricKeyRef &mk, std::vector<Standalone<StringRef>>& fieldKeys ) {
		auto _ = {
			(std::get<Is>(values).registerField( mk, fieldKeys ),Void())...
		};
	}
protected:
    bool it;
};

// A field Descriptor compatible with EventField but with name set at runtime
struct DynamicDescriptor {
	DynamicDescriptor(const char *name)
	  : _name(StringRef((uint8_t *)name, strlen(name))) {}
	StringRef name() const { return _name; }

private:
	const Standalone<StringRef> _name;
};

template<typename T>
struct DynamicField;

struct DynamicFieldBase {
	virtual ~DynamicFieldBase() {}

	virtual StringRef fieldName() = 0;
	virtual const StringRef getDerivedTypeName() = 0;
	virtual void init() = 0;
	virtual void clear() = 0;
	virtual void log(uint64_t t, int64_t l, bool& overflow, int64_t& bytes ) = 0;
	virtual void nextKey( uint64_t t, int level ) = 0;
	virtual void nextKeyAllLevels( uint64_t t) = 0;
	virtual void rollMetric( uint64_t t ) = 0;
	virtual void flushField( MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) = 0;
	virtual void registerField( MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys ) = 0;

	// Set the current value of this field from the value of another
	virtual void setValueFrom(DynamicFieldBase *src, StringRef eventType) = 0;

	// Create a new field of the same type and with the same current value as this one and with the given name
	virtual DynamicFieldBase * createNewWithValue(const char *name) = 0;

	// This does a fairly cheap and "safe" downcast without using dynamic_cast / RTTI by checking that the pointer value
	// of the const char * type string is the same as getDerivedTypeName for this object.
	template<typename T> DynamicField<T> * safe_downcast(StringRef eventType) {
		if(getDerivedTypeName() == metricTypeName<T>())
			return (DynamicField<T> *)this;

		TraceEvent(SevWarnAlways, "ScopeEventFieldTypeMismatch")
			.detail("EventType", eventType.toString())
			.detail("FieldName", fieldName().toString())
			.detail("OldType", getDerivedTypeName().toString())
			.detail("NewType", metricTypeName<T>().toString());
		return NULL;
	}
};

template<typename T>
struct DynamicField : public DynamicFieldBase, EventField<T, DynamicDescriptor> {
	typedef EventField<T, DynamicDescriptor> EventFieldType;
	DynamicField(const char *name) : DynamicFieldBase(), EventFieldType(DynamicDescriptor(name)), value(T()) {}
	virtual ~DynamicField() {}

	StringRef fieldName() { return EventFieldType::name(); }

	// Get the field's datatype, this is used as a form of RTTI by DynamicFieldBase::safe_downcast()
	const StringRef getDerivedTypeName() { return metricTypeName<T>(); }

	// Pure virtual implementations
	void clear() { value = T(); }

	void log(uint64_t t, int64_t l, bool& overflow, int64_t& bytes) {
		return EventFieldType::log(value, t, l, overflow, bytes);
	}

	// Redirects to EventFieldType methods
	void nextKey( uint64_t t, int level ) {
		return EventFieldType::nextKey(t, level);
	}
	void nextKeyAllLevels( uint64_t t) {
		return EventFieldType::nextKeyAllLevels(t);
	}
	void rollMetric( uint64_t t ) {
		return EventFieldType::rollMetric(t);
	}
	void flushField(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
		return EventFieldType::flushField(mk, rollTime, batch);
	}
	void registerField(MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys) {
		return EventFieldType::registerField(mk, fieldKeys);
	}
	void init() {
		return EventFieldType::init();
	}

	// Set this field's value to the value of another field of exactly the same type.
	void setValueFrom(DynamicFieldBase *src, StringRef eventType) {
		DynamicField<T> *s = src->safe_downcast<T>(eventType);
		if(s != NULL)
			set(s->value);
		else
			clear();  // Not really necessary with proper use but just in case it is better to clear than use an old value.
	}

	DynamicFieldBase * createNewWithValue(const char *name) {
		DynamicField<T> *n = new DynamicField<T>(name);
		n->set(value);
		return n;
	}

	// Non virtuals
	void set(T val) { value = val; }

private:
	T value;
};

// A DynamicEventMetric is an EventMetric whose field set can be modified at runtime.
struct DynamicEventMetric : ReferenceCounted<DynamicEventMetric>, MetricUtil<DynamicEventMetric>, BaseEventMetric {
private:
	EventField<int64_t, TimeDescriptor> time;
	bool latestRecorded;

	// TODO:  A Standalone key type isn't ideal because on lookups a ref will be made Standalone just for the search
	// All fields that are set with setField will be in fields.
	std::map<Standalone<StringRef>, DynamicFieldBase *> fields;

	// Set of fields not yet registered
	std::set<Standalone<StringRef> > fieldsToRegister;

	// Whether or not new fields have been added since the last logging.  fieldsToRegister can't
	// be used for this because registration is independent of actually logging data.
	bool newFields;

	void newFieldAdded(Standalone<StringRef> const &fname) {
		fieldsToRegister.insert(fname);  // So that this field will be registered when asked by the metrics logger actor later
		newFields = true;                // So that log() will know that there is a new field

		// Registration has now changed so set registered to false and trigger a reg change event if possible
		registered = false;
		if(pCollection != nullptr)
			pCollection->metricRegistrationChanged.trigger();
	}

public:
	DynamicEventMetric(MetricNameRef const &name, Void = Void());
    ~DynamicEventMetric();

	virtual void addref() { ReferenceCounted<DynamicEventMetric>::addref(); }
	virtual void delref() { ReferenceCounted<DynamicEventMetric>::delref(); }

	void onEnable() {
		// Must initialize fields, previously knobs may not have been set.
		// Note that future fields will be okay because the field constructor will init and the knobs will be set.
		time.init();
		for(auto f : fields)
			f.second->init();
	}

	// Set (or create) a new field in the event
	template<typename ValueType>
	void setField(const char *fieldName, const ValueType &value) {
		StringRef fname((uint8_t *)fieldName, strlen(fieldName));
		DynamicFieldBase *&p = fields[fname];
		if (p != NULL) {
			// FIXME:  This will break for DynamicEventMetric instances that are reused, such as use cases outside
			// of TraceEvents.  Currently there are none in the code, and there may never any be but if you're here
			// because you reused a DynamicEventMetric and got the error below then this issue must be fixed.  One
			// possible solution is to have a flag in DynamicEventMetric which enables this check so that
			// TraceEvent can preserve this behavior.
			TraceEvent(SevError, "DuplicateTraceProperty").detail("Property", fieldName).backtrace();
			if (g_network->isSimulated()) ASSERT(false);
		}
		p = new DynamicField<ValueType>(fieldName);
		if(pCollection != nullptr)
			p->init();
		newFieldAdded(fname);

		// This will return NULL if the datatype is wrong.
		DynamicField<ValueType> *f = p->safe_downcast<ValueType>(getTypeName());
		// Only set the field value if the type is correct.
		// Another option here is to redefine the field to the new type and flush (roll) the existing field but that would create many keys
		// with small values in the db if two frequent events keep tug-of-war'ing the types back and forth.
		if(f != NULL)
			f->set(value);
		else
			p->clear();  // Not really necessary with proper use but just in case it is better to clear than use an old value.
	}

	// This provides a way to first set fields in a temporary DynamicEventMetric and then push all of those field values
	// into another DynamicEventMetric (which is actually logging somewhere) and log the event.
	uint64_t setFieldsAndLogFrom(DynamicEventMetric *source, uint64_t explicitTime = 0) {
		for(auto f : source->fields)
		{
			DynamicFieldBase *&p = fields[f.first];
			if(p == NULL) {
				p = f.second->createNewWithValue(f.first.toString().c_str());
				if(pCollection != nullptr)
					p->init();
				newFieldAdded(f.first);
			}
			else
				p->setValueFrom(f.second, getTypeName());
		}
		return log(explicitTime);
	}

	StringRef getTypeName() { return metricName.name; }

	// Set all of the fields to their default values.
	void clearFields() {
		for(auto f : fields)
			f.second->clear();
	}

	uint64_t log(uint64_t explicitTime = 0);

	// Virtual function implementations
	void flushData(MetricKeyRef const &mk, uint64_t rollTime, MetricUpdateBatch &batch);
	void rollMetric( uint64_t t );
	void registerFields(MetricKeyRef const &mk, std::vector<Standalone<StringRef>>& fieldKeys);
};

// Continuous metrics are a single-field metric using an EventField<TimeAndValue<T>>
template <typename T>
struct TimeAndValue {
	TimeAndValue() : time(0), value() {}
	int64_t time;
	T value;

	// The metric field type for TimeAndValue is just the Value type.
	static inline const StringRef metric_field_type() { return metricTypeName<T>(); }
};

// FieldHeader for continuous metrics, works for T = int, double, bool
template <typename T>
struct FieldHeader<TimeAndValue<T>> {
	FieldHeader() : version(1), count(0), area(0), previous_time(0) {}
	uint8_t version;
	int64_t count;
	// If T is a floating point type then area is a double, otherwise it's an int64_t
	typename std::conditional<std::is_floating_point<T>::value, double, int64_t>::type area;
	int64_t previous_time;

	void update(FieldHeader const &h) {
		count += h.count;
		area += h.area;
	}
	void update(TimeAndValue<T> const &v) {
		++count;
		if(previous_time > 0)
			area += v.value * (v.time - previous_time);
		previous_time = v.time;
	}
	template<class Ar> void serialize(Ar &ar) {
		ar & version;
		ASSERT(version == 1);
		ar & count & area;
	}
};

template <> inline void FieldHeader<TimeAndValue<Standalone<StringRef>>>::update(TimeAndValue<Standalone<StringRef>> const &v) {
	++count;
	area += v.value.size();
}

// ValueBlock encoder/decoder for continuous metrics which have a type of TimeAndValue<T>
// Uses encodings for int64_t and T and encodes (time, value, [time, value]...)
template <typename T>
struct FieldValueBlockEncoding<TimeAndValue<T>> {
	FieldValueBlockEncoding() : time_encoding(), value_encoding() {}
	inline void write(BinaryWriter &w, TimeAndValue<T> const &v) {
		time_encoding.write(w, v.time);
		value_encoding.write(w, v.value);
	}
	TimeAndValue<T> read(BinaryReader &r) {
		TimeAndValue<T> result;
		result.time = time_encoding.read(r);
		result.value = value_encoding.read(r);
		return result;
	}
	FieldValueBlockEncoding<int64_t> time_encoding;
	FieldValueBlockEncoding<T> value_encoding;
};

// ValueBlock encoder/decoder specialization for continuous bool metrics because they are encoded
// more efficiently than encoding the time and bool types separately.
// Instead, time and value are combined to a single value (time delta << 1) + (value ? 1 : 0) and then
// that value is encoded as a delta.
template <>
struct FieldValueBlockEncoding<TimeAndValue<bool>> {
	FieldValueBlockEncoding() : prev(), prev_combined(0) {}
	inline void write(BinaryWriter &w, TimeAndValue<bool> const &v) {
		int64_t combined = (v.time << 1) | (v.value ? 1 : 0);
		w << CompressedInt<int64_t>(combined - prev_combined);
		prev = v;
		prev_combined = combined;
	}
	TimeAndValue<bool> read(BinaryReader &r) {
		CompressedInt<int64_t> d;
		r >> d;
		prev_combined += d.value;
		prev.value = prev_combined & 1;
		prev.time = prev_combined << 1;
		return prev;
	}
	TimeAndValue<bool> prev;
	int64_t prev_combined;
};

template <typename T>
struct ContinuousMetric: NonCopyable, ReferenceCounted<ContinuousMetric<T>>, MetricUtil<ContinuousMetric<T>, T>, BaseMetric {
	// Needed for MetricUtil
	static const StringRef metricType;

private:
	EventField<TimeAndValue<T>> field;
	TimeAndValue<T> tv;
	bool recorded;

public:
	ContinuousMetric(MetricNameRef const &name, T const &initial)
	  : BaseMetric(name), recorded(false) {
		tv.value = initial;
	}

	virtual void addref() { ReferenceCounted<ContinuousMetric<T>>::addref(); }
	virtual void delref() { ReferenceCounted<ContinuousMetric<T>>::delref(); }

	T getValue() const {
		return tv.value;
	}

	void flushData(const MetricKeyRef &mk, uint64_t rollTime, MetricUpdateBatch &batch) {
		if( !recorded ) {
			batch.updates.push_back(std::make_pair(mk.packLatestKey(), getLatestAsValue()));
			recorded = true;
		}

		field.flushField(mk, rollTime, batch);
	}

	void rollMetric(uint64_t t) {
		field.rollMetric(t);
	}

	Standalone<StringRef> getLatestAsValue() {
		FieldValueBlockEncoding< TimeAndValue< T > > enc;
		BinaryWriter wr(AssumeVersion(currentProtocolVersion));
		// Write a header so the client can treat this value like a normal data value block.
		// TOOD: If it is useful, this could be the current header value of the most recently logged level.
		wr << FieldHeader<TimeAndValue<T>>();
		enc.write(wr, tv);
		return wr.toStringRef();
	}

	void onEnable() {
		field.init();
		change();
	}

	void onDisable() {
		change();
	}

	void set(const T &v) {
		if(v != tv.value) {
			if(enabled)
				change();
			tv.value = v;
		}
	}

	// requires += on T
	void add(const T &delta) {
		if(delta != T()) {
			if(enabled)
				change();
			tv.value += delta;
		}
	}

	// requires ! on T
	void toggle() {
		if(enabled)
			change();
		tv.value = !tv.value;
	}

	void change() {
		uint64_t toggleTime = timer_int();
		int64_t bytes = 0;

		if(tv.time != 0) {
			double x = g_random->random01();

			int64_t l = 0;
			if (x == 0.0)
				l = FLOW_KNOBS->MAX_METRIC_LEVEL-1;
			else if (toggleTime != tv.time)
				l = std::min(
					FLOW_KNOBS->MAX_METRIC_LEVEL-1,
					(int64_t)(
						log((toggleTime - tv.time) / x) /
							FLOW_KNOBS->METRIC_LEVEL_DIVISOR
					)
				);

			if(!canLog(l))
				return;

			bool overflow = false;
			field.log(tv, tv.time, l, overflow, bytes);
			if(overflow)
				field.nextKey(toggleTime, l);
		}
		tv.time = toggleTime;
		recorded = false;
		TDMetricCollection::getTDMetrics()->checkRoll(tv.time, bytes);
	}
};

typedef ContinuousMetric<int64_t> Int64Metric;
typedef Int64Metric VersionMetric;
typedef ContinuousMetric<bool> BoolMetric;
typedef ContinuousMetric<Standalone<StringRef>> StringMetric;

// MetricHandle / EventMetricHandle are wrappers for a Reference<MetricType> which provides
// the following interface conveniences
//
//   * The underlying metric reference is always initialized to a valid object.  That valid object
//     may not actually be in a metric collection and therefore may not actually be able to write
//     data to a database, but it will work in other ways (i.e. int metrics will act like integers).
//
//   * Operator =, ++, --, +=, and -= can be used as though the handle is an object of the MetricType::ValueType of
//     the metric type for which it is a handle.
//
//   * Operator -> is defined such that the MetricHandle acts like a pointer to the underlying MetricType
//
//   * Cast operator to MetricType::ValueType is defined so that the handle will act like a MetricType::ValueType
//
//   * The last three features allow, for example, a MetricHandle<Int64Metric> to be a drop-in replacement for an int64_t.
//
template <typename T>
struct MetricHandle {
	template<typename ValueType = typename T::ValueType>
	MetricHandle(StringRef const &name = StringRef(), StringRef const &id = StringRef(), ValueType const &initial = ValueType())
	  : ref(T::getOrCreateInstance(name, id, true, initial)) {
	}

	// Initialize this handle to point to a new or existing metric with (name, id).  If a new metric is created then the handle's
	// current metric's current value will be the new metric's initial value.  This allows Metric handle users to treate their
	// Metric variables as normal variables and then bind them to actual logging metrics later while continuing with the current value.
	void init(StringRef const &name, StringRef const &id = StringRef()) {
		ref = T::getOrCreateInstance(name, id, true, ref->getValue());
	}

	void init(StringRef const &name, StringRef const &id, typename T::ValueType const &initial) {
		ref = T::getOrCreateInstance(name, id, true, initial);
	}

	void operator=(typename T::ValueType const &v) {
		ref->set(v);
	}
	void operator++() {
		ref->add(1);
	}
	void operator++(int) {
		ref->add(1);
	}
	void operator--() {
		ref->add(-1);
	}
	void operator--(int) {
		ref->add(-1);
	}
	void operator+=(typename T::ValueType const &v) {
		ref->add(v);
	}
	void operator-=(typename T::ValueType const &v) {
		ref->add(-v);
	}

	T * operator-> () { return ref.getPtr(); }

	operator typename T::ValueType () const { return ref->getValue(); }
	typename T::ValueType getValue() const  { return ref->getValue(); }

	Reference<T> ref;
};

typedef MetricHandle<Int64Metric> Int64MetricHandle;
typedef MetricHandle<VersionMetric> VersionMetricHandle;
typedef MetricHandle<BoolMetric> BoolMetricHandle;
typedef MetricHandle<StringMetric> StringMetricHandle;

template <typename E>
using EventMetricHandle = MetricHandle<EventMetric<E>>;

#include "flow/unactorcompiler.h"

#endif
